import { EventEmitter } from 'events';
import { PassThrough, Transform, Readable } from 'stream';
import { guid } from './id.js';
import { isPlainObject } from "./is.js";
import { walkJson } from "./index.js";

const RESULTCODE_SUCCESS = '0';
const MESSAGETYPE_HEADER = '0';
const MESSAGETYPE_CONTENT = '1';
const MESSAGETYPE_RESULT = '2';

const ENCODEBUFFER = "ENCODEBUFFER";
const ENCODEFORMDATA = "ENCODEFORMDATA";
const ENCODEERROR = "ENCODEERROR";
const ENCODEREADABLE = "ENCODESTEAM";

export const utils = {
    getParams(params, body) {
        if (!params) {
            params = {};
        }
        if (!body) {
            body = Readable.from('');
        }
        if (!body instanceof Readable) {
            throw Error(`body must be a readable stream`);
        }
        return { params, body };
    },
    eachReadable(stream, fn) {
        return new Promise(resolve => {
            stream.on('data', value => fn(value));
            stream.on('end', () => resolve());
            stream.on('error', e => reject(e));
        });
    },
    fetchReadable(stream) {
        return new Promise((resolve, reject) => {
            let r = [];
            stream.on('data', value => {
                r.push(value);
            });
            stream.on('end', () => {
                resolve(Buffer.concat(r));
            });
            stream.on('error', e => reject(e));
        });
    }
}

class BaseSerialize {
    constructor(context) {
        this._context = context;
    }

    get context() {
        return this._context;
    }

    type() { }

    isEncode(data) {
    }

    isDecode(data) {
        return data && isPlainObject(data) && Reflect.has(data, "encode") && data.encode === this.type();
    }

    async encode(data) { }

    async decode(data) { }

    async getBuffer(blob) {
        return new Promise((resolve, reject) => {
            let fr = new FileReader();
            fr.readAsArrayBuffer(blob);
            fr.onloadend = e => resolve(e.target.result);
            fr.onerror = e => reject(e);
        });
    }
}

class ErrorSerialize extends BaseSerialize {
    constructor(context) {
        super(context);
    }

    type() {
        return ENCODEERROR;
    }

    isEncode(data) {
        return data instanceof Error;
    }

    async encode(error) {
        let to = this.normalize(error);
        return {
            name: to.constructor.name,
            message: to.message,
            stack: to.stack
        };
    }

    decode(obj) {
        let t = new Error();
        Object.assign(t, { name: obj.name, message: obj.message, stack: obj.stack });
        return t;
    }

    normalize(error) {
        if (typeof error === "object" && error instanceof Error) {
            return error;
        } else if (typeof error === "string") {
            return new Error(error);
        }
        return new Error(error || 'unknow error');
    }
}

class BufferSerialize extends BaseSerialize {
    constructor(context) {
        super(context);
    }

    type() {
        return ENCODEBUFFER;
    }

    isEncode(data) {
        return data instanceof ArrayBuffer;
    }

    async encode(data) {
        return {
            type: data.constructor.name,
            data: Array.from(data instanceof ArrayBuffer ? new Uint8Array(data) : data)
        };
    }

    async decode({ type, data }) {
        if (type === 'ArrayBuffer') {
            return new Uint8Array(data).buffer;
        } else {
            return new window[type](data);
        }
    }
}

class ReadableSerialize extends BaseSerialize {
    constructor(context) {
        super(context);
    }

    type() {
        return ENCODEREADABLE;
    }

    isEncode(data) {
        return data instanceof Readable;
    }

    async encode(file) {
        return {
            ...this.context.appendStream(file)
        };
    }

    async decode({ stream }) {
        // console.log('--------->', stream);
        let buffer = await utils.fetchReadable(stream);
        return Readable.from(buffer);
    }
}

class FormDataSerialize extends BaseSerialize {
    type() {
        return ENCODEFORMDATA;
    }

    isEncode(data) {
        return data instanceof FormData;
    }

    async encode(data) {
        let r = [], rr = {};
        data.forEach((value, key) => {
            r.push({ key, value });
        });
        await Promise.all(r.map(async ({ key, value }) => {
            rr[key] = await this.context.serializer.encode(value);
        }));
        return { data: rr };
    }

    async decode({ data }) {
        let r = new FormData(), rr = [];
        Reflect.ownKeys(data).forEach(key => rr.push({ key, value: data[key] }));
        await Promise.all(rr.map(async ({ key, value }) => {
            r.append(key, await this.context.serializer.decode(value));
        }));
        return r;
    }
}

const SerializeMap = [
    BufferSerialize,
    ErrorSerialize,
    ReadableSerialize,
    FormDataSerialize
];

class SerializerContext {
    constructor(serializer, streams = {}) {
        this._streams = streams;
        this._serializer = serializer;
    }

    get streams() {
        return this._streams;
    }

    get serializer() {
        return this._serializer;
    }

    appendStream(stream) {
        let id = guid();
        this._streams[id] = stream;
        return {
            formatId: id,
            formated: 'stream',
        };
    }
}

class EncodeSerializer {
    constructor() {
        this._context = new SerializerContext(this);
        this._serializes = SerializeMap.map(a => {
            return new a(this._context);
        });
    }

    get context() {
        return this._context;
    }

    isEncode(data) {
        let t = this._serializes.find(a => a.isEncode(data));
        return t != undefined;
    }

    async encode(data) {
        let t = this._serializes.find(a => a.isEncode(data));
        if (t) {
            let r = await t.encode(data);
            r.encode = t.type();
            return r;
        }
        return data;
    }
}

class DecodeSerializer {
    constructor(streams = {}) {
        this._context = new SerializerContext(this, streams);
        this._serializes = SerializeMap.map(a => {
            return new a(this._context);
        });
    }

    get context() {
        return this._context;
    }

    isDecode(data) {
        let t = this._serializes.find(a => a.isDecode(data));
        return t != undefined;
    }

    async decode(data) {
        let t = this._serializes.find(a => a.isDecode(data));
        if (t) {
            if (data.formated && data.formated === 'stream' && this.context.streams[data.formatId]) {
                data.stream = this.context.streams[data.formatId];
            }
            return t.decode(data);
        }
        return data;
    }
}

export const JsonSerialize = {
    async encode(data) {
        let serializer = new EncodeSerializer();
        let r = { __data__: data }, tasks = [];
        walkJson(r, null, null, (value, parent, key) => {
            if (serializer.isEncode(value)) {
                tasks.push(serializer.encode(value).then(s => {
                    if (parent) {
                        parent[key] = s;
                    }
                }));
                return false;
            }
            return true;
        });
        await Promise.all(tasks);
        return {
            result: r.__data__,
            streams: serializer.context.streams
        };
    },
    async decode(data, streams) {
        let serializer = new DecodeSerializer(streams);
        let r = { __data__: data }, tasks = [];
        walkJson(r, null, null, (value, parent, key) => {
            if (serializer.isDecode(value)) {
                tasks.push(serializer.decode(value).then(s => {
                    if (parent) {
                        parent[key] = s;
                    }
                }));
                return false;
            }
            return true;
        });
        await Promise.all(tasks);
        return r.__data__;
    }
}

class Request {
    constructor(id, params) {
        this._id = id;
        this._params = params;
        this._body = new PassThrough();
    }

    get id() {
        return this._id;
    }

    get body() {
        return this._body;
    }

    get params() {
        return this._params;
    }
};

class DataPacker extends Transform {
    constructor(params) {
        super();
        this._id = guid().replace(/-/g, '');
        this._params = params;
        this._isSendHeader = false;
    }

    _pack(flag, contentBuffer) {
        let header = Buffer.alloc(4),
            length = contentBuffer.byteLength,
            id = Buffer.from(this._id);
        flag = Buffer.from(flag);
        header.writeInt32BE(length + 32 + 1);
        return Buffer.concat([header, flag, id, contentBuffer], length + 37);
    }

    _sendParams() {
        if (!this._isSendHeader) {
            this._isSendHeader = true;
            this.push(this._pack(MESSAGETYPE_HEADER, Buffer.from(JSON.stringify(this._params))));
        }
    }

    _flush(callback) {
        this._sendParams();
        this.push(this._pack(MESSAGETYPE_RESULT, Buffer.from(RESULTCODE_SUCCESS)));
        callback();
    }

    _transform(chunk, encoding, callback) {
        this._sendParams();
        this.push(this._pack(MESSAGETYPE_CONTENT, chunk));
        callback();
    }
}

class DataUnpacker extends EventEmitter {
    constructor(readable) {
        super();
        this._before = null;
        this._map = new Map();
        readable.on('data', chunk => {
            this._get(chunk);
        });
    }

    _decode(chunk) {
        let type = chunk.toString('utf-8', 0, 1);
        let id = chunk.toString('utf-8', 1, 33);
        let content = chunk.slice(33);
        if (type == MESSAGETYPE_HEADER) {
            let params = JSON.parse(content.toString());
            if (!this._map.has(id)) {
                let request = new Request(id, params);
                this._map.set(id, request);
                this.emit('request', request);
            }
        } else if (type == MESSAGETYPE_CONTENT) {
            let request = this._map.get(id);
            if (request) {
                request.body.write(content);
            }
        } else if (type == MESSAGETYPE_RESULT) {
            let request = this._map.get(id);
            if (request) {
                this._map.delete(id);
                request.body.end();
            }
        }
        this.emit('unpackmessage', id);
    }

    _get(chunk) {
        if (this._before) {
            chunk = Buffer.concat([this._before, chunk]);
        }
        if (chunk.byteLength > 4) {
            let start = 4, end = chunk.readUInt32BE(0) + start;
            if (end > chunk.byteLength) {
                this._before = chunk;
            } else {
                this._decode(chunk.slice(start, end));
                this._before = null;
                this._get(chunk.slice(end));
            }
        } else {
            this._before = chunk;
        }
    }
}

class DataSenderImpl {
    constructor(writeable) {
        this._writeable = writeable;
    }

    send({ params, body }) {
        let r = utils.getParams(params, body);
        return new Promise((resolve, reject) => {
            r.body.pipe(new DataPacker(r.params)).on('data', chunk => {
                this._writeable.write(chunk);
            }).on('error', e => reject(e)).on('end', () => resolve());
        });
    }

    async sendOnce({ params, body }) {
        let r = utils.getParams(params, body);
        return new Promise((resolve, reject) => {
            r.body.pipe(new DataPacker(r.params)).pipe(this._writeable).on('end', () => resolve()).on('error', e => reject(e));
        });
    }
}

class DataTransporterImpl extends EventEmitter {
    constructor(duplex) {
        super();
        this._duplex = duplex;
        new DataUnpacker(duplex).on('request', e => this.emit('request', e));
    }

    transport({ params, body }) {
        let r = utils.getParams(params, body);
        return new Promise((resolve, reject) => {
            r.body.pipe(new DataPacker(r.params)).on('data', chunk => {
                this._duplex.write(chunk);
            }).on('error', e => reject(e)).on('end', () => resolve());
        });
    }
}

class MessageSenderImpl {
    constructor(writable) {
        this._msg = new Map();
        this._sender = DataSender.from(writable);
    }

    async send(params) {
        let pid = guid();
        let { result, streams } = await JsonSerialize.encode(params);
        this._msg.set(pid, new Map());
        return this._sender.send({
            params: { id: pid, step: 'start', data: result, streams: Reflect.ownKeys(streams) }
        }).then(() => {
            let t = [];
            Reflect.ownKeys(streams).forEach(id => {
                t.push({ id, stream: streams[id] });
            });
            return Promise.all(t.map(({ id, stream }) => {
                return this._sender.send({
                    params: { id: pid, step: 'process', streamId: id },
                    body: stream
                });
            })).then(() => {
                return this._sender.send({
                    params: { id: pid, step: 'end' }
                });
            });
        });
    }
}

class MessageReceiverImpl extends EventEmitter {
    constructor(readable) {
        super();
        this._msg = new Map();
        this._receiver = DataReceiver.from(readable);
        this._receiver.on('request', async ({ params, body }) => {
            let { step, data, streamId, streams } = params;
            if (step === 'start') {
                if (!this._msg.has(params.id)) {
                    let streamMap = {};
                    streams.forEach(stmId => {
                        streamMap[stmId] = new PassThrough();
                    });
                    this._msg.set(params.id, streamMap);
                    let result = await JsonSerialize.decode(data, streamMap);
                    this.emit('request', result);
                }
            } else if (step === 'process') {
                if (this._msg.has(params.id) && this._msg.get(params.id)[streamId]) {
                    body.pipe(this._msg.get(params.id)[streamId]);
                }
            } else {
                if (this._msg.has(params.id)) {
                    this._msg.delete(params.id);
                }
            }
        });
    }
}

class MessageObserverImpl {
    constructor({ readable, writable }) {
        this._map = new Map();
        this._subscriber = new Map();
        this._sender = MessageSender.from(writable);
        this._receiver = MessageReceiver.from(readable);
        this._receiver.on('request', async info => {
            let { id, code, type, error, params, data } = info;
            if (type) {
                if (this.isNotify(type, params)) {
                    Promise.resolve().then(() => this.notify(type, params)).then(data => {
                        this._sender.send({ id, code: 0, data });
                    }).catch(e => {
                        this._sender.send({ id, code: 1, error: e });
                    });
                }
                return;
            }
            if (code != undefined) {
                if (this._map.has(id)) {
                    let { resolve, reject } = this._map.get(id);
                    if (code == 0) {
                        resolve(data);
                    } else {
                        reject(error);
                    }
                }
            }
        });
    }

    get sender() {
        return this._sender;
    }

    get receiver() {
        return this._receiver;
    }

    isNotify(type, params) {
        return this._subscriber.has(type);
    }

    notify(type, params) {
        return this._subscriber.get(type)(params);
    }

    publish(type, params) {
        return new Promise((resolve, reject) => {
            let id = guid();
            this._sender.send({ id, type, params });
            this._map.set(id, { resolve, reject });
        });
    }

    subscribe(type, fn) {
        this._subscriber.set(type, fn);
        return this;
    }
}

export const DataSender = {
    from(writeable) {
        return new DataSenderImpl(writeable);
    }
}

export const DataReceiver = {
    from(readable) {
        return new DataUnpacker(readable);
    }
}

export const DataTransporter = {
    from(duplex) {
        return new DataTransporterImpl(duplex);
    }
}

export const MessageSender = {
    from(writable) {
        return new MessageSenderImpl(writable);
    }
};

export const MessageReceiver = {
    from(readable) {
        return new MessageReceiverImpl(readable);
    }
}

export const MessageObserver = {
    form({ readable, writable }) {
        return new MessageObserverImpl({ readable, writable });
    }
}